/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.search.service.impl;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.MultiSearchResponse.Item;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.TypeMissingException;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.search.configuration.SearchConfigurationConstants;
import org.unidata.mdm.search.context.AggregationSearchContext;
import org.unidata.mdm.search.context.ComplexSearchRequestContext;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.type.sort.SortField;
import org.unidata.mdm.search.util.SearchUtils;
import org.unidata.mdm.system.type.annotation.ConfigurationRef;
import org.unidata.mdm.system.type.configuration.ConfigurationValue;

/**
 * @author Mikhail Mikhailov Search agent type.
 */
@Component
public class SearchComponentImpl extends BaseComponentImpl {

    private static final String SCORE_SORT = "_score";
    /**
     * The logger.
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(SearchComponentImpl.class);
    /**
     * default min score for search
     */
    @ConfigurationRef(SearchConfigurationConstants.PROPERTY_DEFAULT_MIN_SCORE)
    private ConfigurationValue<Double> defaultMinScore;

    /**
     * Transport client to use.
     */
    @Autowired
    private Client client;

    /**
     * Empty args constructor, used by container.
     */
    public SearchComponentImpl() {
        super();
    }

    /**
     * One arg constructor, used by utility.
     */
    public SearchComponentImpl(Client client) {
        super();
        this.client = client;
    }

    /**
     * Parameterized search entry point.
     *
     * @param ctx the context
     * @return {@linkplain SearchResponse} instance
     */
    public SearchResponse parameterizedSearch(final SearchRequestContext ctx) {
        return executeRequest(prepareSearchRequest(ctx, createGeneralQueryFromContext(ctx, ctx.getNestedSearch())));
    }

    public Map<SearchRequestContext, SearchResponse> parameterizedSearch(ComplexSearchRequestContext searchRequest) {

        Map<SearchRequestContext, QueryBuilder> queries = createQueryForComplex(searchRequest);
        if (queries.isEmpty()) {
            return Collections.emptyMap();
        }

        if (searchRequest.getType() == ComplexSearchRequestContext.ComplexSearchRequestType.HIERARCHICAL) {

            SearchRequestContext main = searchRequest.getMain();
            SearchResponse result = executeRequest(prepareSearchRequest(main, queries.get(main)));

            // Try to post-process supplementary requests
            // for return values
            if (main.hasJoinBy()) {

                List<SearchRequestContext> returning = searchRequest.getSupplementary().stream()
                        .filter(ctx -> ctx.hasJoinBy() && ctx.hasReturnFields())
                        .collect(Collectors.toList());

                if (CollectionUtils.isNotEmpty(returning)) {

                    List<Object> joinValues = Arrays.stream(result.getHits().getHits())
                        .map(h -> h.getFields().get(main.getJoinBy().getName()))
                        .filter(Objects::nonNull)
                        .map(DocumentField::getValue)
                        .collect(Collectors.toList());

                    if (CollectionUtils.isNotEmpty(joinValues)) {

                        MultiSearchRequestBuilder msrb = client.prepareMultiSearch()
                                .setIndicesOptions(IndicesOptions.lenientExpandOpen());

                        for (SearchRequestContext child : returning) {

                            QueryBuilder cqb = queries.get(child);
                            SearchRequestBuilder srb = prepareSearchRequest(child, cqb);
                            srb.setPostFilter(QueryBuilders.termsQuery(child.getJoinBy().getName(), joinValues));

                            msrb.add(srb);
                        }

                        Map<SearchRequestContext, SearchResponse> responses = new HashMap<>(searchRequest.getSupplementary().size() + 1);
                        MultiSearchResponse response = executeRequest(msrb);

                        int i = 0;
                        for (SearchRequestContext ctx : returning) {
                            Item it = response.getResponses()[i++];
                            responses.put(ctx, it.getResponse());
                        }

                        responses.put(main, result);
                        return responses;
                    }
                }
            }

            return Collections.singletonMap(main, result);
        } else if (searchRequest.getType() == ComplexSearchRequestContext.ComplexSearchRequestType.MULTI) {

            MultiSearchRequestBuilder msrb = client.prepareMultiSearch()
                    .setIndicesOptions(IndicesOptions.lenientExpandOpen());

            searchRequest.getSupplementary().forEach(ctx -> msrb.add(prepareSearchRequest(ctx, queries.get(ctx))));

            Map<SearchRequestContext, SearchResponse> responses = new HashMap<>(searchRequest.getSupplementary().size());
            MultiSearchResponse response = executeRequest(msrb);

            int i = 0;
            for (SearchRequestContext ctx : searchRequest.getSupplementary()) {
                Item it = response.getResponses()[i++];
                responses.put(ctx, it.getResponse());
            }

            return responses;
        }

        return Collections.emptyMap();
    }

    /**
     * Method for extract all record page by page.
     * @param ctx
     * @return
     */
    public List<SearchResponse> parameterizedScrollScanSearch(final SearchRequestContext ctx) {
        List<SearchResponse> result = new ArrayList<>();
        SearchRequestBuilder searchRequestBuilder = prepareSearchRequest(ctx, createGeneralQueryFromContext(ctx, ctx.getNestedSearch()));
        SearchResponse scanResponse = executeRequest(searchRequestBuilder);
        int from = ctx.getPage() * ctx.getCount();
        while (true) {
            result.add(scanResponse);

            if (scanResponse.getHits().getHits().length < ctx.getCount()) {
                break;
            }
            from += ctx.getCount();
            searchRequestBuilder.setFrom(from);
            scanResponse = executeRequest(searchRequestBuilder);
        }
        return result;
    }

    /**
     * Returns number of all existing records for an entity.
     *
     * @param ctx the context
     * @return count
     */
    public long countAll(@Nonnull final SearchRequestContext ctx) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);
        try {

            TotalHits th = client.prepareSearch(indexName)
                    .setSize(0)
                    .setPostFilter(createPostFilterQuery(ctx.getType(), ctx.getFilter()))
                    .execute()
                    .actionGet()
                    .getHits().getTotalHits();

            if (Objects.nonNull(th)) {
                return th.value;
            }

        } catch (TypeMissingException | IndexNotFoundException tme) {
            LOGGER.warn(
                    "Count mapping failed. Type [{}] not found in index [{}]. Skipping.",
                    ctx.getType().getName(), indexName);
        }

        return 0;
    }

    /**
     * Prepare request method.
     *
     * @param query prepared query
     * @param ctx   context
     * @return result
     */
    private SearchRequestBuilder prepareSearchRequest(SearchRequestContext ctx, QueryBuilder query) {

        // 1. Compose the name of the type
        final String indexName = constructIndexName(ctx);
        int from = ctx.getPage() * ctx.getCount();
        int size = (ctx.getPage() * ctx.getCount() + ctx.getCount()) > getMaxWindowSize()
                ? ctx.getCount() - ((ctx.getPage() * ctx.getCount() + ctx.getCount()) - getMaxWindowSize())
                : ctx.getCount();


        SearchRequestBuilder srb = client.prepareSearch(indexName)
                .setQuery(query)
                .setMinScore(ctx.getScore() != null ? ctx.getScore() : defaultMinScore.getValue().floatValue())
                .setFrom(from)
                .setSize(size);

        if (ctx.getShardNumber() != null) {
            srb.setPreference(Preference.SHARDS.type() + ":" + ctx.getShardNumber().toString());
        }

        if (CollectionUtils.isNotEmpty(ctx.getRoutings())) {
            srb.setRouting(ctx.getRoutings().toArray(new String[ctx.getRoutings().size()]));
        }

        if (ctx.isScoreEnabled()) {
            srb.setTrackScores(true);
            srb.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
        }

        srb.setPostFilter(createPostFilterQuery(ctx.getType(), ctx.getFilter()));

        addSorts(srb, ctx);

        // search after can be used only with from equals 0
        if (CollectionUtils.isNotEmpty(ctx.getSearchAfter())) {
            srb.searchAfter(ctx.getSearchAfter().toArray());
            srb.setFrom(0);
        }

        addAggregations(srb, ctx);

        srb.setFetchSource(SearchUtils.extractReturnFields(ctx), null);
        return srb;
    }

    /**
     * Adds aggregations to search request builder.
     * @param srb search request builder
     * @param ctx search request context
     */
    private void addAggregations(SearchRequestBuilder srb, SearchRequestContext ctx) {

        if (CollectionUtils.isEmpty(ctx.getAggregations())) {
            return;
        }

        Collection<AggregationSearchContext> arcs = ctx.getAggregations();
        for (AggregationSearchContext aCtx : arcs) {
            srb.addAggregation(createAggregation(aCtx));
        }
    }

    /**
     * @param srb SearchRequestBuilder
     * @param ctx search context
     */
    private void addSorts(SearchRequestBuilder srb, SearchRequestContext ctx) {

        boolean scoreSupplied = false;
        for(SortField sortField : ctx.getSortFields()) {

            if (!scoreSupplied) {
                scoreSupplied = SCORE_SORT.equals(sortField.getPath());
            }

            SortOrder sortOrder = sortField.getSortOrder() == SortField.SortOrder.ASC ? SortOrder.ASC : SortOrder.DESC;
            String sortFieldName = sortField.isAnalyzed() ? stringNonAnalyzedField(sortField.getPath()) : sortField.getPath();
            srb.addSort(sortFieldName, sortOrder);
        }

        if (ctx.isScoreEnabled() && !scoreSupplied) {
            srb.addSort(SCORE_SORT, SortOrder.DESC);
        }
    }
}